package com.atguigu.flink.chapter04;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * TODO slot共享组
 *
 * @author cjp
 * @version 1.0
 * @date 2021/8/6 11:16
 */
public class Flink02_WC_SlotSharingGroup {
    public static void main(String[] args) throws Exception {
        // 0.获取 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.读取数据
//        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 9999);
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop1", 9999);
        // 2.处理数据 3.输出(打印)
        socketDS
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(0)
                .sum(1).slotSharingGroup("haha")
                .print();

        // 4.启动
        env.execute("");

    }
}
/*
    1、只有属于同一个 slot共享组 的 subtask，才可以 共享同一个 slot
    2、属于同一个算子的 subtask，不能共享同一个 slot

 */